package com.atguigu.app.dws;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.PropertyNamingStrategy;
import com.alibaba.fastjson.serializer.SerializeConfig;
import com.atguigu.bean.UserRegisterBean;
import com.atguigu.common.Constant;
import com.atguigu.utils.DateFormatUtil;
import com.atguigu.utils.DorisUtil;
import com.atguigu.utils.KafkaUtil;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.time.Duration;

//数据流:web/app -> Nginx -> 业务服务器(Mysql) -> Maxwell -> Kafka(ODS) -> FlinkApp -> Kafka(DWD) -> FlinkApp -> Doris
//程  序:Mock -> MySQL -> Maxwell -> Kafka(ZK) -> Dwd09_BaseDbApp -> Kafka(ZK) -> Dws05_UserUserRegisterWindow -> Doris
public class Dws05_UserUserRegisterWindow {

    public static void main(String[] args) throws Exception {

        //1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //1.1 开启CK
        env.enableCheckpointing(10000L);
        CheckpointConfig checkpointConfig = env.getCheckpointConfig();
        checkpointConfig.setCheckpointTimeout(20000L);
        checkpointConfig.setCheckpointStorage("hdfs://hadoop102:8020/flink-ck");
        checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        //checkpointConfig.setCheckpointInterval(10000L);
        checkpointConfig.setMinPauseBetweenCheckpoints(5000L);
        checkpointConfig.setMaxConcurrentCheckpoints(2);
        //默认是int类型的最大值
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000L));
        env.setStateBackend(new HashMapStateBackend());

        System.setProperty("HADOOP_USER_NAME", "atguigu");

        //2.读取Kafka DWD层用户注册主题数据
        DataStreamSource<String> kafkaDS = env.fromSource(KafkaUtil.getKafkaSource(Constant.TOPIC_DWD_USER_REGISTER, "user_register_230315"), WatermarkStrategy.noWatermarks(), "kafka-source");

        //3.过滤&转换为JavaBean对象
        SingleOutputStreamOperator<UserRegisterBean> userRegisterDS = kafkaDS.flatMap(new FlatMapFunction<String, UserRegisterBean>() {
            @Override
            public void flatMap(String value, Collector<UserRegisterBean> out) throws Exception {
                if (!"".equals(value)) {
                    JSONObject jsonObject = JSON.parseObject(value);
                    //createTime:2023-08-29 09:24:06
                    String createTime = jsonObject.getString("create_time");
                    out.collect(new UserRegisterBean("",
                            "",
                            createTime.split(" ")[0],
                            1L,
                            DateFormatUtil.toTs(createTime, true)));
                }
            }
        });

        //4.提取事件时间，开窗聚合
        SingleOutputStreamOperator<UserRegisterBean> resultDS = userRegisterDS.assignTimestampsAndWatermarks(WatermarkStrategy.<UserRegisterBean>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner(new SerializableTimestampAssigner<UserRegisterBean>() {
                    @Override
                    public long extractTimestamp(UserRegisterBean element, long recordTimestamp) {
                        return element.getTs();
                    }
                })).windowAll(TumblingEventTimeWindows.of(Time.seconds(10)))
                .reduce(new ReduceFunction<UserRegisterBean>() {
                    @Override
                    public UserRegisterBean reduce(UserRegisterBean value1, UserRegisterBean value2) throws Exception {
                        value1.setRegisterCt(value1.getRegisterCt() + value2.getRegisterCt());
                        return value1;
                    }
                }, new AllWindowFunction<UserRegisterBean, UserRegisterBean, TimeWindow>() {
                    @Override
                    public void apply(TimeWindow window, Iterable<UserRegisterBean> values, Collector<UserRegisterBean> out) throws Exception {
                        UserRegisterBean next = values.iterator().next();
                        next.setEdt(DateFormatUtil.toYmdHms(window.getEnd()));
                        next.setStt(DateFormatUtil.toYmdHms(window.getStart()));
                        out.collect(next);
                    }
                });

        //5.将数据写出到Doris
        resultDS.print(">>>>>>>>");
        resultDS.map(bean -> {
                    SerializeConfig config = new SerializeConfig();
                    config.propertyNamingStrategy = PropertyNamingStrategy.SnakeCase;  // 转成json的时候, 属性名使用下划线
                    return JSON.toJSONString(bean, config);
                })
                .sinkTo(DorisUtil.getDorisSink("dws_user_user_register_window"));

        //6.启动
        env.execute("Dws05_UserUserRegisterWindow");

    }

}
